package consumer
import java.io.{File, FileInputStream}
import java.util.Properties

import org.apache.commons.lang.StringUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import testjson.phoenix

object Sparkmain {



  Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别

  var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0327.properties"

  /**
    * 定义一个UserInfo对象
    */
  case class UserInfo (
                        id: String,
                        name: String,
                        sex: String,
                        city: String,
                        occupation: String,
                        tel: String,
                        fixPhoneNum: String,
                        bankName: String,
                        address: String,
                        marriage: String,
                        childNum: String
                      )

  def main(args: Array[String]): Unit = {
    //加载配置文件
    val properties = new Properties()
    val file = new File(confPath)
    if(!file.exists()) {
      System.out.println(Sparkmain.getClass.getClassLoader.getResource("0327.properties"))
      val in = Sparkmain.getClass.getClassLoader.getResourceAsStream("0327.properties")
      properties.load(in);
    } else {
      properties.load(new FileInputStream(confPath))
    }

    val brokers = properties.getProperty("kafka.brokers")
    val topics = properties.getProperty("kafka.topics")
    println("kafka.brokers:" + brokers)
    println("kafka.topics:" + topics)

    if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics)) {
      println("未配置Kafka信息...")
      System.exit(0)
    }
    val topicsSet = topics.split(",").toSet

    val spark = SparkSession.builder().appName("Kafka2Spark2Hive-nokerberos").config(new SparkConf()).getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口，每5s处理一次
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
      , "auto.offset.reset" -> "latest"
      // , "security.protocol" -> "SASL_PLAINTEXT"
      //  , "sasl.kerberos.service.name" -> "kafka"
      , "key.deserializer" -> classOf[StringDeserializer]
      , "value.deserializer" -> classOf[StringDeserializer]
      , "group.id" -> "testgroup"
      ,"enable.auto.commit" -> (false: java.lang.Boolean)
    )

val phoenixs = new phoenix
    val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))



 val values=   dStream.map(record =>{
      val value=record.value()

    }

//   phoenixs.JSONphoenix(values)


    )
    dStream.print()

//    val dStream = KafkaUtils.createDirectStream[String, String](ssc,
//      LocationStrategies.PreferConsistent,
//      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    //引入隐式
    import spark.implicits._
    val a = ssc.sparkContext.makeRDD(List(1,2,3,45,0))

    ssc.start()
    ssc.awaitTermination()
  }
}
